package com.ssm.kafka.listern.producer;

import com.alibaba.fastjson.JSON;
import com.ssm.kafka.listern.constant.KafkaMesEnum;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;

/**
 * kafkaProducer模板
 * 	使用此模板发送消息
 */
//@Component
public class KafkaProducerServer{

	@Resource
	private KafkaTemplate<String, String> kafkaTemplate;

	// 使用分区
	private static String IFPARTITION_YES = "0";
	
	/**
	 * kafka发送消息模板
	 * @param topic 主题
	 * @param value	messageValue
	 * @param ifPartition 是否使用分区 0是\1不是
	 * @param partitionNum 分区数 如果是否使用分区为0,分区数必须大于0
	 * @param role 角色:bbc app erp...
	 */
	public Map<String,Object> sndMesForTemplate(String topic, Object value, String ifPartition, 
			Integer partitionNum, String role){
		String key = role+"-"+value.hashCode();
		String valueString = JSON.toJSONString(value);
		if(IFPARTITION_YES.equals(ifPartition)){
			//表示使用分区
			int partitionIndex = getPartitionIndex(key, partitionNum);
			ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);
			return checkProRecord(result);
		}else{
			ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);
			return checkProRecord(result);
		}
	}

	/**
	 * 根据key值获取分区索引
	 * @param key:key
	 * @param partitionNum:分区索引
	 * @return 分区号
	 */
	private int getPartitionIndex(String key, int partitionNum){
	    return key == null ? new Random().nextInt(partitionNum) : Math.abs(key.hashCode()) % partitionNum;
	}
	
	/**
	 * 检查发送返回结果record
	 * @param res：kafka返回结果
	 * @return resultMap：结果
	 */
	private Map<String,Object> checkProRecord(ListenableFuture<SendResult<String, String>> res){
		Map<String,Object> resultMap = new HashMap<>(2);
		if(res!=null){
			try {
                //检查result结果集
                SendResult r = res.get();
				/*检查recordMetadata的offset数据，不检查producerRecord*/
				Long offsetIndex = r.getRecordMetadata().offset();
				if(offsetIndex >= 0){
                    resultMap.put("code", KafkaMesEnum.SUCCESS_CODE.getValue());
                    resultMap.put("message", KafkaMesEnum.SUCCESS_MES.getValue());
					return resultMap;
				}else{
                    resultMap.put("code", KafkaMesEnum.KAFKA_NO_OFFSET_CODE.getValue());
                    resultMap.put("message", KafkaMesEnum.KAFKA_NO_OFFSET_MES.getValue());
					return resultMap;
				}
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
                resultMap.put("code", KafkaMesEnum.KAFKA_SEND_ERROR_CODE.getValue());
                resultMap.put("message", KafkaMesEnum.KAFKA_SEND_ERROR_MES.getValue());
				return resultMap;
			}
		}else{
            resultMap.put("code", KafkaMesEnum.KAFKA_NO_RESULT_CODE.getValue());
            resultMap.put("message", KafkaMesEnum.KAFKA_NO_RESULT_MES.getValue());
			return resultMap;
		}
	}
	

}
